5.3 - Code - The Micro Bot Environment
This file, micro_bot.py
, provides the complete implementation for our combat micro-management task. It defines the MicroBot
and MicroEnv
classes, which create a controlled 1v1 kiting scenario for our agent to learn in.
This code is the final translation of our combat design specification into a functional gymnasium
environment.
The Code: micro_bot.py
# micro_bot.py
import numpy as np
import gymnasium as gym
from gymnasium.spaces import Box, Discrete
import queue # For handling queue empty exceptions
from sc2.bot_ai import BotAI
from sc2.ids.unit_typeid import UnitTypeId
from sc2.unit import Unit
from sc2_gym_env import SC2GymEnv # Import our custom wrapper
class MicroBot(BotAI):
"""
The BotAI actor for learning to kite in a controlled 1v1 scenario.
"""
def __init__(self, action_queue, obs_queue):
super().__init__()
self.action_queue = action_queue
self.obs_queue = obs_queue
# State variables for tracking the combatants and their health
self.marine_tag: int = 0
self.zergling_tag: int = 0
self.last_marine_hp: float = 0
self.last_zergling_hp: float = 0
async def on_start(self):
"""
Uses debug commands to create the 1v1 Marine vs. Zergling scenario.
"""
p = self.start_location
await self.client.debug_create_unit([[UnitTypeId.MARINE, 1, p, 1]])
await self.client.debug_create_unit([[UnitTypeId.ZERGLING, 1, p.towards(self.enemy_start_locations[0], 8), 2]])
await self.client.debug_kill_unit(self.workers)
def _get_units(self) -> tuple[Unit | None, Unit | None]:
"""Helper to retrieve the current Unit objects for our combatants."""
marine = self.units.find_by_tag(self.marine_tag)
zergling = self.enemy_units.find_by_tag(self.zergling_tag)
return marine, zergling
async def _handle_terminal_state(self, marine_won: bool):
"""Sends the final reward and terminates the episode."""
reward = 100.0 if marine_won else -100.0
# Send final observation (an array of zeros)
self.obs_queue.put((np.zeros(5, dtype=np.float32), reward, True, False, {}))
await self.client.leave()
async def on_step(self, iteration: int):
# On the very first step, find the units and initialize their tags/health.
if iteration == 0:
if self.units and self.enemy_units:
self.marine_tag = self.units.first.tag
self.zergling_tag = self.enemy_units.first.tag
self.last_marine_hp = self.units.first.health_max
self.last_zergling_hp = self.enemy_units.first.health_max
marine, zergling = self._get_units()
# Check for a terminal state (win/loss).
if not marine or not zergling:
await self._handle_terminal_state(marine_won=(marine is not None))
return
# Throttle the RL loop.
if iteration % 4 == 0:
try:
action = self.action_queue.get_nowait()
# Execute the agent's chosen maneuver.
if action == 0:
self.do(marine.attack(zergling))
elif action == 1:
self.do(marine.move(marine.position.towards(zergling.position, -2)))
elif action == 2:
self.do(marine.move(zergling.position))
# Calculate the health differential reward.
marine_hp, zergling_hp = marine.health, zergling.health
reward = (self.last_zergling_hp - zergling_hp) - (self.last_marine_hp - marine_hp)
self.last_marine_hp, self.last_zergling_hp = marine_hp, zergling_hp
# Create the observation vector for the new state.
observation = np.array([
marine.health_percentage,
1.0 if marine.weapon_cooldown > 0 else 0.0,
zergling.health_percentage,
marine.distance_to(zergling) / 15.0, # Normalize distance
1.0 if marine.is_attacking and marine.target_in_range(zergling) else 0.0,
], dtype=np.float32)
self.obs_queue.put((observation, reward, False, False, {}))
except queue.Empty:
return # No action from agent, continue.
class MicroEnv(SC2GymEnv):
"""
The Gymnasium environment for our MicroBot.
"""
def __init__(self):
super().__init__(bot_class=MicroBot, map_name="AcropolisLE")
# Action space: 3 discrete combat maneuvers
self.action_space = Discrete(3)
# Observation space: A 5-element vector of combat data
low = np.array([0.0, 0.0, 0.0, 0.0, 0.0], dtype=np.float32)
high = np.array([1.0, 1.0, 1.0, np.inf, 1.0], dtype=np.float32)
self.observation_space = Box(
low=low,
high=high,
dtype=np.float32
)